Flink SQL ব্যবহার করে একটি Data Analytics প্রোজেক্ট তৈরি করা অত্যন্ত কার্যকর, কারণ এটি স্ট্রিম ডেটা প্রসেসিং এবং ব্যাচ ডেটা এনালাইটিক্স উভয়ের জন্য SQL সিনট্যাক্স ব্যবহার করে ডেটা প্রসেসিংকে সহজ করে। Flink SQL ব্যবহার করে ডেটা সোর্স থেকে ডেটা পড়া, ট্রান্সফর্মেশন করা, এবং বিভিন্ন ধরনের এনালাইটিক্স করা সম্ভব। এখানে, আমরা একটি উদাহরণ প্রোজেক্ট তৈরি করব যা একটি রিয়েল-টাইম ডেটা স্ট্রিম প্রসেস করবে এবং SQL ব্যবহার করে কিছু এনালাইটিক্স সম্পাদন করবে।

প্রোজেক্টের উদ্দেশ্য

আমাদের উদাহরণ প্রোজেক্টটি একটি ই-কমার্স সাইটের রিয়েল-টাইম অর্ডার ডেটা প্রসেস করবে। আমরা নিম্নোক্ত কাজগুলো সম্পাদন করব:

  1. প্রতিটি অর্ডার ডেটা স্ট্রিম থেকে পড়া।
  2. প্রতিটি প্রোডাক্টের জন্য বিক্রয় হিসাব করা।
  3. টাইম উইন্ডোর উপর ভিত্তি করে বিক্রয় পরিসংখ্যান বের করা (যেমন প্রতি ১০ সেকেন্ডে বিক্রয় সংক্ষেপ)।
  4. সর্বাধিক বিক্রিত প্রোডাক্ট নির্ধারণ করা।

প্রয়োজনীয় সেটআপ

  • Apache Flink ইন্সটল করতে হবে।
  • Apache Kafka (ঐচ্ছিক) রিয়েল-টাইম স্ট্রিম সোর্স হিসেবে ব্যবহার করা হবে।
  • Flink SQL CLI বা Java API ব্যবহার করে SQL কোয়েরি চালানো হবে।

১. Flink SQL Environment সেটআপ করা

প্রথমে, একটি Flink SQL Environment তৈরি করতে হবে। আমরা এখানে একটি Java API উদাহরণ ব্যবহার করছি, তবে Flink SQL CLI থেকেও একই কাজ করা সম্ভব।

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkSQLAnalytics {
    public static void main(String[] args) {
        // Execution এবং Table Environment তৈরি করা
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // Flink SQL কোয়েরি এবং ডেটা প্রসেসিং এখানে হবে
    }
}

২. Data Source রেজিস্ট্রেশন করা (Kafka অথবা অন্য সোর্স)

Flink SQL এ, আমরা একটি Kafka সোর্স ব্যবহার করে অর্ডার ডেটা পড়ব। Kafka এর মাধ্যমে প্রতিটি অর্ডার একটি JSON ফরম্যাটে স্ট্রিম করা হবে।

String kafkaSourceDDL = "CREATE TABLE orders (" +
                        "  order_id STRING," +
                        "  product_id STRING," +
                        "  quantity INT," +
                        "  price DOUBLE," +
                        "  order_time TIMESTAMP(3)," +
                        "  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
                        ") WITH (" +
                        "  'connector' = 'kafka'," +
                        "  'topic' = 'ecommerce_orders'," +
                        "  'properties.bootstrap.servers' = 'localhost:9092'," +
                        "  'format' = 'json'" +
                        ")";

tableEnv.executeSql(kafkaSourceDDL);

বর্ণনা:

  • এখানে, একটি orders টেবিল রেজিস্টার করা হয়েছে যা Kafka থেকে ডেটা পড়ে।
  • WATERMARK ব্যবহার করে order_time ফিল্ডে টাইম উইন্ডো ম্যানেজ করা হয়েছে।

৩. ডেটা প্রসেসিং ও এনালাইটিক্স চালানো

Flink SQL ব্যবহার করে বিভিন্ন এনালাইটিক্স কোয়েরি চালানো হবে।

১. প্রতি প্রোডাক্টের মোট বিক্রয় বের করা:

SELECT product_id, SUM(quantity * price) AS total_sales
FROM orders
GROUP BY product_id;

বর্ণনা: এই কোয়েরি প্রতিটি প্রোডাক্টের জন্য মোট বিক্রয় হিসাব করে।

২. টাইম উইন্ডোর উপর ভিত্তি করে বিক্রয় সংক্ষেপ বের করা:

SELECT 
  product_id, 
  TUMBLE_START(order_time, INTERVAL '10' SECOND) AS window_start,
  TUMBLE_END(order_time, INTERVAL '10' SECOND) AS window_end,
  SUM(quantity * price) AS total_sales
FROM orders
GROUP BY 
  product_id,
  TUMBLE(order_time, INTERVAL '10' SECOND);

বর্ণনা: এই কোয়েরি প্রতিটি ১০ সেকেন্ডের উইন্ডোতে প্রতিটি প্রোডাক্টের বিক্রয় সংক্ষেপ হিসাব করে।

৩. সর্বাধিক বিক্রিত প্রোডাক্ট নির্ধারণ করা:

SELECT product_id, COUNT(order_id) AS order_count
FROM orders
GROUP BY product_id
ORDER BY order_count DESC
LIMIT 1;

বর্ণনা: এই কোয়েরি সর্বাধিক অর্ডার সংখ্যা বিশিষ্ট প্রোডাক্ট বের করে এবং তা সারণী অনুযায়ী সাজায়।

৪. Data Sink ব্যবহার করে আউটপুট সংরক্ষণ করা

প্রসেস করা ডেটাকে Flink SQL ব্যবহার করে Kafka বা অন্য কোনও স্টোরেজ সিস্টেমে পাঠানো যায়। এখানে, আমরা Kafka সিংক ব্যবহার করছি।

String kafkaSinkDDL = "CREATE TABLE result_sink (" +
                      "  product_id STRING," +
                      "  total_sales DOUBLE" +
                      ") WITH (" +
                      "  'connector' = 'kafka'," +
                      "  'topic' = 'processed_sales'," +
                      "  'properties.bootstrap.servers' = 'localhost:9092'," +
                      "  'format' = 'json'" +
                      ")";

tableEnv.executeSql(kafkaSinkDDL);

// প্রক্রিয়াকৃত টেবিলকে সিংকে লেখার জন্য SQL
tableEnv.executeSql("INSERT INTO result_sink SELECT product_id, SUM(quantity * price) AS total_sales FROM orders GROUP BY product_id");

বর্ণনা:

  • result_sink নামে একটি Kafka সিংক টেবিল তৈরি করা হয়েছে, যেখানে প্রক্রিয়াকৃত ডেটা পাঠানো হচ্ছে।
  • INSERT INTO কমান্ড ব্যবহার করে SQL কোয়েরি সিংকে রেজাল্ট পাঠাচ্ছে।

৫. প্রজেক্ট চালানো এবং রেজাল্ট মনিটরিং

Flink এর SQL কোয়েরিগুলো চালানোর পর আপনি Flink এর ড্যাশবোর্ড থেকে টাস্ক এবং ডেটা প্রসেসিং মনিটর করতে পারেন। এছাড়া, আপনি Kafka Consumer ব্যবহার করে প্রক্রিয়াকৃত ডেটা স্ট্রিম দেখতেও পারেন।

# Kafka Consumer দিয়ে আউটপুট মনিটর করা
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processed_sales --from-beginning

উপসংহার

Apache Flink SQL ব্যবহার করে এই প্রোজেক্টে আমরা দেখলাম কীভাবে একটি রিয়েল-টাইম ডেটা স্ট্রিম থেকে এনালাইটিক্স করা যায়। Flink SQL এর মাধ্যমে সহজেই ডেটা সোর্স রেজিস্টার করে এবং বিভিন্ন ট্রান্সফরমেশন ও অ্যানালাইটিক্স করা সম্ভব। Flink SQL এর ক্ষমতা বড় আকারের ডেটা প্রসেসিং এবং অ্যানালাইটিক্স প্রোজেক্টে অত্যন্ত কার্যকর।

আরও দেখুন...

Promotion